// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rootcoord

import (
	"context"
	"fmt"
	"math/rand"
	"strconv"
	"sync"
	"time"

	"github.com/cockroachdb/errors"
	"github.com/samber/lo"
	"github.com/tidwall/gjson"
	"github.com/tikv/client-go/v2/txnkv"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/atomic"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"

	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
	"github.com/milvus-io/milvus/internal/allocator"
	"github.com/milvus-io/milvus/internal/coordinator/snmanager"
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/kv/tikv"
	"github.com/milvus-io/milvus/internal/metastore"
	kvmetastore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
	"github.com/milvus-io/milvus/internal/metastore/model"
	"github.com/milvus-io/milvus/internal/rootcoord/tombstone"
	streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
	tso2 "github.com/milvus-io/milvus/internal/tso"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/dependency"
	"github.com/milvus-io/milvus/internal/util/proxyutil"
	"github.com/milvus-io/milvus/internal/util/sessionutil"
	"github.com/milvus-io/milvus/internal/util/streamingutil"
	tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil"
	"github.com/milvus-io/milvus/pkg/v2/common"
	"github.com/milvus-io/milvus/pkg/v2/kv"
	"github.com/milvus-io/milvus/pkg/v2/log"
	"github.com/milvus-io/milvus/pkg/v2/metrics"
	pb "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
	"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
	"github.com/milvus-io/milvus/pkg/v2/proto/proxypb"
	"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
	"github.com/milvus-io/milvus/pkg/v2/util"
	"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
	"github.com/milvus-io/milvus/pkg/v2/util/contextutil"
	"github.com/milvus-io/milvus/pkg/v2/util/expr"
	"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
	"github.com/milvus-io/milvus/pkg/v2/util/merr"
	"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
	"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
	"github.com/milvus-io/milvus/pkg/v2/util/retry"
	"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
	"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
	"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)

// UniqueID is an alias of typeutil.UniqueID.
type UniqueID = typeutil.UniqueID

// Timestamp is an alias of typeutil.Timestamp
type Timestamp = typeutil.Timestamp

const InvalidCollectionID = UniqueID(0)

var Params *paramtable.ComponentParam = paramtable.Get()

type Opt func(*Core)

type metaKVCreator func() kv.MetaKv

// Core root coordinator core
type Core struct {
	ctx              context.Context
	cancel           context.CancelFunc
	wg               sync.WaitGroup
	etcdCli          *clientv3.Client
	tikvCli          *txnkv.Client
	address          string
	meta             IMetaTable
	scheduler        IScheduler
	broker           Broker
	ddlTsLockManager DdlTsLockManager

	metaKVCreator metaKVCreator

	proxyCreator       proxyutil.ProxyCreator
	proxyWatcher       *proxyutil.ProxyWatcher
	proxyClientManager proxyutil.ProxyClientManagerInterface

	metricsCacheManager *metricsinfo.MetricsCacheManager

	chanTimeTick *timetickSync

	idAllocator  allocator.Interface
	tsoAllocator tso2.Allocator

	mixCoord       types.MixCoord
	streamingCoord *streamingcoord.Server
	quotaCenter    *QuotaCenter

	stateCode atomic.Int32
	initOnce  sync.Once
	startOnce sync.Once
	session   sessionutil.SessionInterface

	factory dependency.Factory

	activateFunc func() error

	metricsRequest *metricsinfo.MetricsRequest

	tombstoneSweeper tombstone.TombstoneSweeper
}

// --------------------- function --------------------------

// NewCore creates a new rootcoord core
func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
	ctx, cancel := context.WithCancel(c)
	rand.Seed(time.Now().UnixNano())
	core := &Core{
		ctx:            ctx,
		cancel:         cancel,
		factory:        factory,
		metricsRequest: metricsinfo.NewMetricsRequest(),
	}

	core.UpdateStateCode(commonpb.StateCode_Abnormal)
	core.SetProxyCreator(proxyutil.DefaultProxyCreator)

	expr.Register("rootcoord", core)
	return core, nil
}

// UpdateStateCode update state code
func (c *Core) UpdateStateCode(code commonpb.StateCode) {
	c.stateCode.Store(int32(code))
	log.Ctx(c.ctx).Info("update rootcoord state", zap.String("state", code.String()))
}

func (c *Core) GetStateCode() commonpb.StateCode {
	return commonpb.StateCode(c.stateCode.Load())
}

func (c *Core) sendTimeTick(t Timestamp, reason string) error {
	pc := c.chanTimeTick.listDmlChannels()
	pt := make([]uint64, len(pc))
	for i := 0; i < len(pt); i++ {
		pt[i] = t
	}
	ttMsg := internalpb.ChannelTimeTickMsg{
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_TimeTick),
			commonpbutil.WithTimeStamp(t),
			commonpbutil.WithSourceID(ddlSourceID),
		),
		ChannelNames:     pc,
		Timestamps:       pt,
		DefaultTimestamp: t,
	}
	return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
}

func (c *Core) sendMinDdlTsAsTt() {
	if !paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() {
		return
	}
	log := log.Ctx(c.ctx)
	code := c.GetStateCode()
	if code != commonpb.StateCode_Healthy {
		log.Warn("rootCoord is not healthy, skip send timetick")
		return
	}
	minBgDdlTs := c.ddlTsLockManager.GetMinDdlTs()
	minNormalDdlTs := c.scheduler.GetMinDdlTs()
	minDdlTs := funcutil.Min(minBgDdlTs, minNormalDdlTs)

	// zero	-> ddlTsLockManager and scheduler not started.
	if minDdlTs == typeutil.ZeroTimestamp {
		log.Warn("zero ts was met, this should be only occurred in starting state", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs))
		return
	}

	// max	-> abnormal case, impossible.
	if minDdlTs == typeutil.MaxTimestamp {
		log.Warn("ddl ts is abnormal, max ts was met", zap.Uint64("minBgDdlTs", minBgDdlTs), zap.Uint64("minNormalDdlTs", minNormalDdlTs))
		return
	}

	if err := c.sendTimeTick(minDdlTs, "timetick loop"); err != nil {
		log.Warn("failed to send timetick", zap.Error(err))
	}
}

func (c *Core) startTimeTickLoop() {
	log := log.Ctx(c.ctx)
	defer c.wg.Done()

	streamingNotifier := snmanager.NewStreamingReadyNotifier()
	defer streamingNotifier.Release()

	if streamingutil.IsStreamingServiceEnabled() {
		if err := snmanager.StaticStreamingNodeManager.RegisterStreamingEnabledListener(c.ctx, streamingNotifier); err != nil {
			log.Info("register streaming enabled listener failed", zap.Error(err))
			return
		}
		if streamingNotifier.IsReady() {
			log.Info("streaming service has been enabled, ddl timetick from rootcoord should not start")
			return
		}
	}

	ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond))
	defer ticker.Stop()
	for {
		select {
		case <-streamingNotifier.Ready():
			log.Info("streaming service has been enabled, ddl timetick from rootcoord should stop")
			return
		case <-c.ctx.Done():
			log.Info("rootcoord's timetick loop quit!")
			return
		case <-ticker.C:
			c.sendMinDdlTsAsTt()
		}
	}
}

func (c *Core) tsLoop() {
	defer c.wg.Done()
	tsoTicker := time.NewTicker(tso2.UpdateTimestampStep)
	defer tsoTicker.Stop()
	ctx, cancel := context.WithCancel(c.ctx)
	defer cancel()
	log := log.Ctx(c.ctx)
	for {
		select {
		case <-tsoTicker.C:
			if err := c.tsoAllocator.UpdateTSO(); err != nil {
				log.Warn("failed to update tso", zap.Error(err))
				continue
			}
			ts := c.tsoAllocator.GetLastSavedTime()
			metrics.RootCoordTimestampSaved.Set(float64(ts.Unix()))

		case <-ctx.Done():
			log.Info("rootcoord's ts loop quit!")
			return
		}
	}
}

func (c *Core) SetProxyCreator(f func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) {
	c.proxyCreator = f
}

func (c *Core) SetMixCoord(s types.MixCoord) error {
	c.mixCoord = s
	return nil
}

// Register register rootcoord at etcd
func (c *Core) Register() error {
	return nil
}

func (c *Core) SetAddress(address string) {
	c.address = address
}

// SetEtcdClient sets the etcdCli of Core
func (c *Core) SetEtcdClient(etcdClient *clientv3.Client) {
	c.etcdCli = etcdClient
}

// SetTiKVClient sets the tikvCli of Core
func (c *Core) SetTiKVClient(client *txnkv.Client) {
	c.tikvCli = client
}

func (c *Core) SetSession(session sessionutil.SessionInterface) error {
	c.session = session
	if c.session == nil {
		return errors.New("session is nil, the etcd client connection may have failed")
	}
	return nil
}

func (c *Core) initKVCreator() {
	if c.metaKVCreator == nil {
		if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
			c.metaKVCreator = func() kv.MetaKv {
				return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
					tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
			}
		} else {
			c.metaKVCreator = func() kv.MetaKv {
				return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
					etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
			}
		}
	}
}

func (c *Core) initMetaTable(initCtx context.Context) error {
	fn := func() error {
		var catalog metastore.RootCoordCatalog
		var err error

		switch Params.MetaStoreCfg.MetaStoreType.GetValue() {
		case util.MetaStoreTypeEtcd:
			log.Ctx(initCtx).Info("Using etcd as meta storage.")
			var ss *kvmetastore.SuffixSnapshot
			var err error

			metaKV := c.metaKVCreator()
			if ss, err = kvmetastore.NewSuffixSnapshot(metaKV, kvmetastore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetastore.SnapshotPrefix); err != nil {
				return err
			}
			catalog = kvmetastore.NewCatalog(metaKV, ss)
		case util.MetaStoreTypeTiKV:
			log.Ctx(initCtx).Info("Using tikv as meta storage.")
			var ss *kvmetastore.SuffixSnapshot
			var err error

			metaKV := c.metaKVCreator()
			if ss, err = kvmetastore.NewSuffixSnapshot(metaKV, kvmetastore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetastore.SnapshotPrefix); err != nil {
				return err
			}
			catalog = kvmetastore.NewCatalog(metaKV, ss)
		default:
			return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", Params.MetaStoreCfg.MetaStoreType.GetValue()))
		}

		if c.meta, err = NewMetaTable(c.ctx, catalog, c.tsoAllocator); err != nil {
			return err
		}

		return nil
	}

	return retry.Do(initCtx, fn, retry.Attempts(10))
}

func (c *Core) initIDAllocator(initCtx context.Context) error {
	var tsoKV kv.TxnKV
	var kvPath string
	if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
		kvPath = Params.TiKVCfg.KvRootPath.GetValue()
		tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, kvPath, globalIDAllocatorSubPath)
	} else {
		kvPath = Params.EtcdCfg.KvRootPath.GetValue()
		tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, kvPath, globalIDAllocatorSubPath)
	}
	idAllocator := allocator.NewGlobalIDAllocator(globalIDAllocatorKey, tsoKV)
	if err := idAllocator.Initialize(); err != nil {
		return err
	}
	c.idAllocator = idAllocator

	log.Ctx(initCtx).Info("id allocator initialized",
		zap.String("root_path", kvPath),
		zap.String("sub_path", globalIDAllocatorSubPath),
		zap.String("key", globalIDAllocatorKey))

	return nil
}

func (c *Core) initTSOAllocator(initCtx context.Context) error {
	var tsoKV kv.TxnKV
	var kvPath string
	if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
		kvPath = Params.TiKVCfg.KvRootPath.GetValue()
		tsoKV = tsoutil2.NewTSOTiKVBase(c.tikvCli, Params.TiKVCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath)
	} else {
		kvPath = Params.EtcdCfg.KvRootPath.GetValue()
		tsoKV = tsoutil2.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), globalIDAllocatorSubPath)
	}
	tsoAllocator := tso2.NewGlobalTSOAllocator(globalTSOAllocatorKey, tsoKV)
	if err := tsoAllocator.Initialize(); err != nil {
		return err
	}
	c.tsoAllocator = tsoAllocator

	log.Ctx(initCtx).Info("tso allocator initialized",
		zap.String("root_path", kvPath),
		zap.String("sub_path", globalIDAllocatorSubPath),
		zap.String("key", globalIDAllocatorKey))

	return nil
}

func (c *Core) initInternal() error {
	initCtx, initSpan := log.NewIntentContext(typeutil.RootCoordRole, "initInternal")
	defer initSpan.End()
	log := log.Ctx(initCtx)

	c.UpdateStateCode(commonpb.StateCode_Initializing)

	if err := c.initIDAllocator(initCtx); err != nil {
		return err
	}

	if err := c.initTSOAllocator(initCtx); err != nil {
		return err
	}

	if err := c.initMetaTable(initCtx); err != nil {
		return err
	}

	c.scheduler = newScheduler(c.ctx, c.idAllocator, c.tsoAllocator)

	c.factory.Init(Params)
	chanMap := c.meta.ListCollectionPhysicalChannels(c.ctx)
	c.chanTimeTick = newTimeTickSync(initCtx, c.ctx, c.session.GetServerID(), c.factory, chanMap)
	log.Info("create TimeTick sync done")

	c.proxyClientManager = proxyutil.NewProxyClientManager(c.proxyCreator)

	c.broker = newServerBroker(c)
	c.ddlTsLockManager = newDdlTsLockManager(c.tsoAllocator)

	c.proxyWatcher = proxyutil.NewProxyWatcher(
		c.etcdCli,
		c.chanTimeTick.initSessions,
		c.proxyClientManager.AddProxyClients,
	)
	c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
	c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
	log.Info("init proxy manager done")

	c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()

	c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.mixCoord, c.tsoAllocator, c.meta)
	log.Debug("RootCoord init QuotaCenter done")

	if err := c.initCredentials(initCtx); err != nil {
		return err
	}
	log.Info("init credentials done")

	if err := c.initRbac(initCtx); err != nil {
		return err
	}

	log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address))
	return nil
}

func (c *Core) registerMetricsRequest() {
	c.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics,
		func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
			return c.getSystemInfoMetrics(ctx, req)
		})
	log.Ctx(c.ctx).Info("register metrics actions finished")
}

// Init initialize routine
func (c *Core) Init() error {
	log := log.Ctx(c.ctx)
	var initError error
	c.registerMetricsRequest()
	c.factory.Init(Params)
	c.initKVCreator()

	c.initOnce.Do(func() {
		initError = c.initInternal()
		RegisterDDLCallbacks(c)
	})
	log.Info("RootCoord init successfully")
	return initError
}

func (c *Core) initCredentials(initCtx context.Context) error {
	return c.meta.InitCredential(initCtx)
}

func (c *Core) initRbac(initCtx context.Context) error {
	var err error
	// create default roles, including admin, public
	for _, role := range util.DefaultRoles {
		err = c.meta.CreateRole(initCtx, util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
		if err != nil && !common.IsIgnorableError(err) {
			return errors.Wrap(err, "failed to create role")
		}
	}

	if Params.CommonCfg.EnablePublicPrivilege.GetAsBool() {
		err = c.initPublicRolePrivilege(initCtx)
		if err != nil {
			return err
		}
	}

	if Params.RoleCfg.Enabled.GetAsBool() {
		return c.initBuiltinRoles(initCtx)
	}
	return nil
}

func (c *Core) initPublicRolePrivilege(initCtx context.Context) error {
	// grant privileges for the public role
	globalPrivileges := []string{
		commonpb.ObjectPrivilege_PrivilegeDescribeCollection.String(),
		commonpb.ObjectPrivilege_PrivilegeListAliases.String(),
	}
	collectionPrivileges := []string{
		commonpb.ObjectPrivilege_PrivilegeIndexDetail.String(),
	}

	var err error
	for _, globalPrivilege := range globalPrivileges {
		err = c.meta.OperatePrivilege(initCtx, util.DefaultTenant, &milvuspb.GrantEntity{
			Role:       &milvuspb.RoleEntity{Name: util.RolePublic},
			Object:     &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Global.String()},
			ObjectName: util.AnyWord,
			DbName:     util.AnyWord,
			Grantor: &milvuspb.GrantorEntity{
				User:      &milvuspb.UserEntity{Name: util.UserRoot},
				Privilege: &milvuspb.PrivilegeEntity{Name: globalPrivilege},
			},
		}, milvuspb.OperatePrivilegeType_Grant)
		if err != nil && !common.IsIgnorableError(err) {
			return errors.Wrap(err, "failed to grant global privilege")
		}
	}
	for _, collectionPrivilege := range collectionPrivileges {
		err = c.meta.OperatePrivilege(initCtx, util.DefaultTenant, &milvuspb.GrantEntity{
			Role:       &milvuspb.RoleEntity{Name: util.RolePublic},
			Object:     &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Collection.String()},
			ObjectName: util.AnyWord,
			DbName:     util.AnyWord,
			Grantor: &milvuspb.GrantorEntity{
				User:      &milvuspb.UserEntity{Name: util.UserRoot},
				Privilege: &milvuspb.PrivilegeEntity{Name: collectionPrivilege},
			},
		}, milvuspb.OperatePrivilegeType_Grant)
		if err != nil && !common.IsIgnorableError(err) {
			return errors.Wrap(err, "failed to grant collection privilege")
		}
	}
	return nil
}

func (c *Core) initBuiltinRoles(ctx context.Context) error {
	log := log.Ctx(ctx)
	rolePrivilegesMap := Params.RoleCfg.Roles.GetAsRoleDetails()
	for role, privilegesJSON := range rolePrivilegesMap {
		err := c.meta.CreateRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
		if err != nil && !common.IsIgnorableError(err) {
			log.Error("create a builtin role fail", zap.String("roleName", role), zap.Error(err))
			return errors.Wrapf(err, "failed to create a builtin role: %s", role)
		}
		for _, privilege := range privilegesJSON[util.RoleConfigPrivileges] {
			privilegeName, err := c.getMetastorePrivilegeName(ctx, privilege[util.RoleConfigPrivilege])
			if err != nil {
				return errors.Wrapf(err, "failed to get metastore privilege name for: %s", privilege[util.RoleConfigPrivilege])
			}

			err = c.meta.OperatePrivilege(ctx, util.DefaultTenant, &milvuspb.GrantEntity{
				Role:       &milvuspb.RoleEntity{Name: role},
				Object:     &milvuspb.ObjectEntity{Name: privilege[util.RoleConfigObjectType]},
				ObjectName: privilege[util.RoleConfigObjectName],
				DbName:     privilege[util.RoleConfigDBName],
				Grantor: &milvuspb.GrantorEntity{
					User:      &milvuspb.UserEntity{Name: util.UserRoot},
					Privilege: &milvuspb.PrivilegeEntity{Name: privilegeName},
				},
			}, milvuspb.OperatePrivilegeType_Grant)
			if err != nil && !common.IsIgnorableError(err) {
				log.Error("grant privilege to builtin role fail", zap.String("roleName", role), zap.Any("privilege", privilege), zap.Error(err))
				return errors.Wrapf(err, "failed to grant privilege: <%s, %s, %s> of db: %s to role: %s", privilege[util.RoleConfigObjectType], privilege[util.RoleConfigObjectName], privilege[util.RoleConfigPrivilege], privilege[util.RoleConfigDBName], role)
			}
		}
		util.BuiltinRoles = append(util.BuiltinRoles, role)
		log.Info("init a builtin role successfully", zap.String("roleName", role))
	}
	return nil
}

func (c *Core) restore(ctx context.Context) error {
	dbs, err := c.meta.ListDatabases(ctx, typeutil.MaxTimestamp)
	if err != nil {
		return err
	}

	c.tombstoneSweeper = tombstone.NewTombstoneSweeper()
	for _, db := range dbs {
		colls, err := c.meta.ListCollections(ctx, db.Name, typeutil.MaxTimestamp, false)
		if err != nil {
			return err
		}
		// restore the tombstone into the tombstone sweeper.
		for _, coll := range colls {
			// CollectionCreating is a deprecated status,
			// we cannot promise the coordinator handle it correctly, so just treat it as a tombstone.
			if coll.State == pb.CollectionState_CollectionDropping || coll.State == pb.CollectionState_CollectionCreating {
				c.tombstoneSweeper.AddTombstone(newCollectionTombstone(c.meta, c.broker, coll.CollectionID))
				continue
			}
			for _, part := range coll.Partitions {
				if part.State == pb.PartitionState_PartitionDropping || part.State == pb.PartitionState_PartitionCreating {
					c.tombstoneSweeper.AddTombstone(newPartitionTombstone(c.meta, c.broker, coll.CollectionID, part.PartitionID))
				}
			}
		}
	}
	return nil
}

func (c *Core) startInternal() error {
	log := log.Ctx(c.ctx)
	if err := c.proxyWatcher.WatchProxy(c.ctx); err != nil {
		log.Fatal("rootcoord failed to watch proxy", zap.Error(err))
		// you can not just stuck here,
		panic(err)
	}

	if err := c.restore(c.ctx); err != nil {
		panic(err)
	}

	if Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
		c.quotaCenter.Start()
	}

	c.scheduler.Start()
	go func() {
		// refresh rbac cache
		if err := retry.Do(c.ctx, func() error {
			if err := c.proxyClientManager.RefreshPolicyInfoCache(c.ctx, &proxypb.RefreshPolicyInfoCacheRequest{
				OpType: int32(typeutil.CacheRefresh),
			}); err != nil {
				log.RatedWarn(60, "fail to refresh policy info cache", zap.Error(err))
				return err
			}
			return nil
		}, retry.Attempts(100), retry.Sleep(time.Second)); err != nil {
			log.Warn("fail to refresh policy info cache", zap.Error(err))
		}
	}()

	c.startServerLoop()
	c.UpdateStateCode(commonpb.StateCode_Healthy)
	sessionutil.SaveServerInfo(typeutil.MixCoordRole, c.session.GetServerID())
	log.Info("rootcoord startup successfully")
	return nil
}

func (c *Core) startServerLoop() {
	c.wg.Add(3)
	go c.tsLoop()
	go c.startTimeTickLoop()
	go c.chanTimeTick.startWatch(&c.wg)
}

// Start starts RootCoord.
func (c *Core) Start() error {
	var err error
	c.startOnce.Do(func() {
		err = c.startInternal()
	})

	return err
}

func (c *Core) stopScheduler() {
	if c.scheduler != nil {
		c.scheduler.Stop()
		log.Ctx(c.ctx).Info("stop rootcoord scheduler")
	}
}

func (c *Core) cancelIfNotNil() {
	if c.cancel != nil {
		c.cancel()
		log.Ctx(c.ctx).Info("cancel rootcoord goroutines")
	}
}

func (c *Core) revokeSession() {
	if c.session != nil {
		// wait at most one second to revoke
		c.session.Stop()
		log.Ctx(c.ctx).Info("rootcoord session stop")
	}
}

func (c *Core) GracefulStop() {
}

// Stop stops rootCoord.
func (c *Core) Stop() error {
	c.UpdateStateCode(commonpb.StateCode_Abnormal)
	if c.tombstoneSweeper != nil {
		c.tombstoneSweeper.Close()
	}
	c.stopScheduler()

	if c.proxyWatcher != nil {
		c.proxyWatcher.Stop()
	}
	if c.quotaCenter != nil {
		c.quotaCenter.stop()
	}

	c.revokeSession()
	c.cancelIfNotNil()
	c.wg.Wait()
	return nil
}

// GetComponentStates get states of components
func (c *Core) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
	code := c.GetStateCode()
	log.Ctx(ctx).Debug("RootCoord current state", zap.String("StateCode", code.String()))

	nodeID := common.NotRegisteredID
	if c.session != nil && c.session.Registered() {
		nodeID = c.session.GetServerID()
	}

	return &milvuspb.ComponentStates{
		State: &milvuspb.ComponentInfo{
			// NodeID:    c.session.ServerID, // will race with Core.Register()
			NodeID:    nodeID,
			Role:      typeutil.RootCoordRole,
			StateCode: code,
			ExtraInfo: nil,
		},
		Status: merr.Success(),
		SubcomponentStates: []*milvuspb.ComponentInfo{
			{
				NodeID:    nodeID,
				Role:      typeutil.RootCoordRole,
				StateCode: code,
				ExtraInfo: nil,
			},
		},
	}, nil
}

// GetTimeTickChannel get timetick channel name
func (c *Core) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: merr.Success(),
		Value:  Params.CommonCfg.RootCoordTimeTick.GetValue(),
	}, nil
}

// GetStatisticsChannel get statistics channel name
func (c *Core) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: merr.Success(),
		Value:  Params.CommonCfg.RootCoordStatistics.GetValue(),
	}, nil
}

func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	method := "CreateDatabase"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("CreateDatabase")

	log.Ctx(ctx).Info("received request to create database", zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))

	if err := c.broadcastCreateDatabase(ctx, in); err != nil {
		log.Ctx(ctx).Info("failed to create database",
			zap.String("role", typeutil.RootCoordRole),
			zap.Error(err),
			zap.String("dbName", in.GetDbName()),
			zap.Int64("msgID", in.GetBase().GetMsgID()))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.Int64("msgID", in.GetBase().GetMsgID()))
	return merr.Success(), nil
}

func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	method := "DropDatabase"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DropDatabase")

	log.Ctx(ctx).Info("received request to drop database", zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))

	if err := c.broadcastDropDatabase(ctx, in); err != nil {
		if errors.Is(err, merr.ErrDatabaseNotFound) {
			log.Ctx(ctx).Info("drop a database that not found, ignore it", zap.String("dbName", in.GetDbName()))
			return merr.Success(), nil
		}
		log.Ctx(ctx).Info("failed to drop database", zap.String("role", typeutil.RootCoordRole),
			zap.Error(err),
			zap.String("dbName", in.GetDbName()),
			zap.Int64("msgID", in.GetBase().GetMsgID()))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.CleanupRootCoordDBMetrics(in.GetDbName())
	log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
	return merr.Success(), nil
}

func (c *Core) ListDatabases(ctx context.Context, in *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		ret := &milvuspb.ListDatabasesResponse{Status: merr.Status(err)}
		return ret, nil
	}
	method := "ListDatabases"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("ListDatabases")

	log := log.Ctx(ctx).With(zap.Int64("msgID", in.GetBase().GetMsgID()))
	log.Info("received request to list databases")

	t := &listDatabaseTask{
		baseTask: newBaseTask(ctx, c),
		Req:      in,
		Resp:     &milvuspb.ListDatabasesResponse{},
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Info("failed to enqueue request to list databases", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return &milvuspb.ListDatabasesResponse{
			Status: merr.Status(err),
		}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Info("failed to list databases", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return &milvuspb.ListDatabasesResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to list databases", zap.Int("num of databases", len(t.Resp.GetDbNames())))
	return t.Resp, nil
}

// CreateCollection create collection
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}
	metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("CreateCollection")

	logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()))
	logger.Info("received request to create collection")

	if err := c.broadcastCreateCollectionV1(ctx, in); err != nil {
		if errors.Is(err, errIgnoredCreateCollection) {
			logger.Info("create existed collection with same schema, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Info("failed to create collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to create collection")
	return merr.Success(), nil
}

// AddCollectionField add field
func (c *Core) AddCollectionField(ctx context.Context, in *milvuspb.AddCollectionFieldRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionField", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("AddCollectionField")

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()))
	log.Info("received request to add collection field")

	if err := c.broadcastAlterCollectionForAddField(ctx, in); err != nil {
		log.Info("failed to add collection field", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionField", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionField", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("AddCollectionField").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to add collection field")
	return merr.Success(), nil
}

// DropCollection drop collection
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}
	metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DropCollection")

	logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("name", in.GetCollectionName()))
	logger.Info("received request to drop collection")

	if err := c.broadcastDropCollectionV1(ctx, in); err != nil {
		if errors.Is(err, errIgnoredDropCollection) {
			logger.Info("drop collection that not found, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Info("failed to drop collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to drop collection")
	return merr.Success(), nil
}

// HasCollection check collection existence
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.BoolResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("HasCollection")

	ts := getTravelTs(in)
	log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
		zap.Uint64("ts", ts))

	t := &hasCollectionTask{
		baseTask: newBaseTask(ctx, c),
		Req:      in,
		Rsp:      &milvuspb.BoolResponse{},
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Info("failed to enqueue request to has collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
		return &milvuspb.BoolResponse{
			Status: merr.Status(err),
		}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Info("failed to has collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
		return &milvuspb.BoolResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasCollection").Observe(float64(t.queueDur.Milliseconds()))

	return t.Rsp, nil
}

// getCollectionIDStr get collectionID string to avoid the alias name
func (c *Core) getCollectionIDStr(ctx context.Context, db, collectionName string, collectionID int64) string {
	// When neither the collection name nor the collectionID exists, no error is returned at this point.
	// An error will be returned during the execution phase.
	if collectionID != 0 {
		return strconv.FormatInt(collectionID, 10)
	}

	coll, err := c.meta.GetCollectionByName(ctx, db, collectionName, typeutil.MaxTimestamp)
	if err != nil {
		return "-1"
	}
	return strconv.FormatInt(coll.CollectionID, 10)
}

func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*model.Collection, error) {
	ts := getTravelTs(in)
	if in.GetCollectionName() != "" {
		return c.meta.GetCollectionByName(ctx, in.GetDbName(), in.GetCollectionName(), ts)
	}
	return c.meta.GetCollectionByID(ctx, in.GetDbName(), in.GetCollectionID(), ts, allowUnavailable)
}

func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName string) *milvuspb.DescribeCollectionResponse {
	resp := &milvuspb.DescribeCollectionResponse{
		Status: merr.Success(),
		DbName: dbName,
	}

	resp.Schema = &schemapb.CollectionSchema{
		Name:               collInfo.Name,
		Description:        collInfo.Description,
		AutoID:             collInfo.AutoID,
		Fields:             model.MarshalFieldModels(collInfo.Fields),
		StructArrayFields:  model.MarshalStructArrayFieldModels(collInfo.StructArrayFields),
		Functions:          model.MarshalFunctionModels(collInfo.Functions),
		EnableDynamicField: collInfo.EnableDynamicField,
		Properties:         collInfo.Properties,
	}
	resp.CollectionID = collInfo.CollectionID
	resp.VirtualChannelNames = collInfo.VirtualChannelNames
	resp.PhysicalChannelNames = collInfo.PhysicalChannelNames
	if collInfo.ShardsNum == 0 {
		collInfo.ShardsNum = int32(len(collInfo.VirtualChannelNames))
	}
	resp.ShardsNum = collInfo.ShardsNum
	resp.ConsistencyLevel = collInfo.ConsistencyLevel

	resp.CreatedTimestamp = collInfo.CreateTime
	createdPhysicalTime, _ := tsoutil.ParseHybridTs(collInfo.CreateTime)
	resp.CreatedUtcTimestamp = uint64(createdPhysicalTime)
	resp.Aliases = aliases
	resp.StartPositions = collInfo.StartPositions
	resp.CollectionName = resp.Schema.Name
	resp.Properties = collInfo.Properties
	resp.NumPartitions = int64(len(collInfo.Partitions))
	resp.DbId = collInfo.DBID
	resp.UpdateTimestamp = collInfo.UpdateTimestamp
	resp.UpdateTimestampStr = strconv.FormatUint(collInfo.UpdateTimestamp, 10)
	return resp
}

func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.DescribeCollectionResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DescribeCollection")

	ts := getTravelTs(in)
	log := log.Ctx(ctx).With(zap.String("collectionName", in.GetCollectionName()),
		zap.String("dbName", in.GetDbName()),
		zap.Int64("id", in.GetCollectionID()),
		zap.Uint64("ts", ts),
		zap.Bool("allowUnavailable", allowUnavailable))

	t := &describeCollectionTask{
		baseTask:         newBaseTask(ctx, c),
		Req:              in,
		Rsp:              &milvuspb.DescribeCollectionResponse{Status: merr.Success()},
		allowUnavailable: allowUnavailable,
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Info("failed to enqueue request to describe collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
		return &milvuspb.DescribeCollectionResponse{
			Status: merr.Status(err),
		}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Warn("failed to describe collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
		return &milvuspb.DescribeCollectionResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DescribeCollection").Observe(float64(t.queueDur.Milliseconds()))

	return t.Rsp, nil
}

// DescribeCollection return collection info
func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
	return c.describeCollectionImpl(ctx, in, false)
}

// DescribeCollectionInternal same to DescribeCollection, but will return unavailable collections and
// only used in internal RPC.
// When query cluster tried to do recovery, it'll be healthy until all collections' targets were recovered,
// so during this time, releasing request generated by rootcoord's recovery won't succeed. So in theory, rootcoord goes
// to be healthy, querycoord recovers all collections' targets, and then querycoord serves the releasing request sent
// by rootcoord, eventually, the dropping collections will be released.
func (c *Core) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
	return c.describeCollectionImpl(ctx, in, true)
}

// ShowCollections list all collection names
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.ShowCollectionsResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("ShowCollections")

	ts := getTravelTs(in)
	log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()),
		zap.Uint64("ts", ts))

	t := &showCollectionTask{
		baseTask: newBaseTask(ctx, c),
		Req:      in,
		Rsp:      &milvuspb.ShowCollectionsResponse{},
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Info("failed to enqueue request to show collections", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
		return &milvuspb.ShowCollectionsResponse{
			Status: merr.Status(err),
		}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Info("failed to show collections", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
		return &milvuspb.ShowCollectionsResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowCollections").Observe(float64(t.queueDur.Milliseconds()))

	return t.Rsp, nil
}

// ShowCollectionIDs returns all collection IDs.
func (c *Core) ShowCollectionIDs(ctx context.Context, in *rootcoordpb.ShowCollectionIDsRequest) (*rootcoordpb.ShowCollectionIDsResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &rootcoordpb.ShowCollectionIDsResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionIDs", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("ShowCollectionIDs")

	ts := typeutil.MaxTimestamp
	log := log.Ctx(ctx).With(zap.Strings("dbNames", in.GetDbNames()), zap.Bool("allowUnavailable", in.GetAllowUnavailable()))

	// Currently, this interface is only called during startup, so there is no need to execute it within the scheduler.
	var err error
	var dbs []*model.Database
	if len(in.GetDbNames()) == 0 {
		// show all collections
		dbs, err = c.meta.ListDatabases(ctx, ts)
		if err != nil {
			log.Info("failed to ListDatabases", zap.Error(err))
			metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionIDs", metrics.FailLabel).Inc()
			return &rootcoordpb.ShowCollectionIDsResponse{
				Status: merr.Status(err),
			}, nil
		}
	} else {
		dbs = make([]*model.Database, 0, len(in.GetDbNames()))
		for _, name := range in.GetDbNames() {
			db, err := c.meta.GetDatabaseByName(ctx, name, ts)
			if err != nil {
				log.Info("failed to GetDatabaseByName", zap.Error(err))
				metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionIDs", metrics.FailLabel).Inc()
				return &rootcoordpb.ShowCollectionIDsResponse{
					Status: merr.Status(err),
				}, nil
			}
			dbs = append(dbs, db)
		}
	}
	dbCollections := make([]*rootcoordpb.DBCollections, 0, len(dbs))
	for _, db := range dbs {
		collections, err := c.meta.ListCollections(ctx, db.Name, ts, !in.GetAllowUnavailable())
		if err != nil {
			log.Info("failed to ListCollections", zap.Error(err))
			metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionIDs", metrics.FailLabel).Inc()
			return &rootcoordpb.ShowCollectionIDsResponse{
				Status: merr.Status(err),
			}, nil
		}
		dbCollections = append(dbCollections, &rootcoordpb.DBCollections{
			DbName: db.Name,
			CollectionIDs: lo.Map(collections, func(col *model.Collection, _ int) int64 {
				return col.CollectionID
			}),
		})
	}
	metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollectionIDs", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("ShowCollectionIDs").Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Info("ShowCollectionIDs done", zap.Any("collectionIDs", dbCollections))

	return &rootcoordpb.ShowCollectionIDsResponse{
		Status:        merr.Success(),
		DbCollections: dbCollections,
	}, nil
}

func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("AlterCollection")

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()),
		zap.Any("props", in.Properties),
		zap.Strings("deleteKeys", in.DeleteKeys),
	)
	log.Info("received request to alter collection")

	if err := c.broadcastAlterCollectionForAlterCollection(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterCollection) {
			log.Info("alter collection make no changes, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		log.Warn("failed to alter collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to alter collection")
	return merr.Success(), nil
}

func (c *Core) AddCollectionFunction(ctx context.Context, in *milvuspb.AddCollectionFunctionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionFunction", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("AddCollectionFunction")

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()),
	)
	log.Info("received request to Add collection function")

	if err := c.broadcastAlterCollectionForAddFunction(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterCollection) {
			log.Info("add collection function make no changes, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionFunction", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		log.Warn("failed to alter collection function", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionFunction", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AddCollectionFunction", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("AddCollectionFunction").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to add collection function")
	return merr.Success(), nil
}

func (c *Core) AlterCollectionFunction(ctx context.Context, in *milvuspb.AlterCollectionFunctionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionFunction", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("AlterCollectionFunction")

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()),
	)
	log.Info("received request to alter collection function")

	if err := c.broadcastAlterCollectionForAlterFunction(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterCollection) {
			log.Info("alter collection function make no changes, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionFunction", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		log.Warn("failed to alter collection function", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionFunction", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionFunction", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollectionFunction").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to alter collection function")
	return merr.Success(), nil
}

func (c *Core) DropCollectionFunction(ctx context.Context, in *milvuspb.DropCollectionFunctionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollectionFunction", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DropCollectionFunction")

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()),
		zap.String("functionName", in.GetFunctionName()),
	)
	log.Info("received request to drop collection function")

	if err := c.broadcastAlterCollectionForDropFunction(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterCollection) {
			log.Info("Drop collection function make no changes, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollectionFunction", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		log.Warn("failed to drop collection function", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollectionFunction", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollectionFunction", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("DropCollectionFunction").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to drop collection function")
	return merr.Success(), nil
}

func (c *Core) AlterCollectionField(ctx context.Context, in *milvuspb.AlterCollectionFieldRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("AlterCollectionField")

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()),
		zap.String("fieldName", in.GetFieldName()),
		zap.Any("props", in.Properties),
		zap.Strings("deleteKeys", in.DeleteKeys),
	)
	log.Info("received request to alter collection field")

	if err := c.broadcastAlterCollectionV2ForAlterCollectionField(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterCollection) {
			log.Info("alter collection field make no changes, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		log.Warn("failed to alter collection field", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollectionField", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollectionField").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to alter collection field")
	return merr.Success(), nil
}

func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	method := "AlterDatabase"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.Any("props", in.Properties),
		zap.Strings("deleteKeys", in.DeleteKeys))
	log.Info("received request to alter database")

	if err := c.broadcastAlterDatabase(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterDatabase) {
			log.Info("alter database make no changes, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		log.Warn("failed to alter database", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to alter database")
	return merr.Success(), nil
}

// CreatePartition create partition
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}
	metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("CreatePartition")

	logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()),
		zap.String("partitionName", in.GetPartitionName()))
	logger.Info("received request to create partition")

	if err := c.broadcastCreatePartition(ctx, in); err != nil {
		if errors.Is(err, errIgnoerdCreatePartition) {
			logger.Info("create partition that already exists, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Info("failed to create partition", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to create partition")
	return merr.Success(), nil
}

// DropPartition drop partition
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DropPartition")
	logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("collection", in.GetCollectionName()),
		zap.String("partition", in.GetPartitionName()))
	logger.Info("received request to drop partition")

	if err := c.broadcastDropPartition(ctx, in); err != nil {
		if errors.Is(err, errIgnoredDropPartition) {
			logger.Info("drop partition that not found, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Warn("failed to drop partition", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to drop partition")
	return merr.Success(), nil
}

// HasPartition check partition existence
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.BoolResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("HasPartition")

	// TODO(longjiquan): why HasPartitionRequest doesn't contain Timestamp but other requests do.
	ts := typeutil.MaxTimestamp
	log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
		zap.String("partition", in.GetPartitionName()),
		zap.Uint64("ts", ts))

	t := &hasPartitionTask{
		baseTask: newBaseTask(ctx, c),
		Req:      in,
		Rsp:      &milvuspb.BoolResponse{},
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Info("failed to enqueue request to has partition", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
		return &milvuspb.BoolResponse{
			Status: merr.Status(err),
		}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Info("failed to has partition", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
		return &milvuspb.BoolResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("HasPartition").Observe(float64(t.queueDur.Milliseconds()))

	return t.Rsp, nil
}

func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitionsRequest, allowUnavailable bool) (*milvuspb.ShowPartitionsResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.ShowPartitionsResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("ShowPartitions")

	log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
		zap.Int64("collection_id", in.GetCollectionID()),
		zap.Strings("partitions", in.GetPartitionNames()),
		zap.Bool("allowUnavailable", allowUnavailable))

	t := &showPartitionTask{
		baseTask:         newBaseTask(ctx, c),
		Req:              in,
		Rsp:              &milvuspb.ShowPartitionsResponse{},
		allowUnavailable: allowUnavailable,
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Info("failed to enqueue request to show partitions", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
		return &milvuspb.ShowPartitionsResponse{
			Status: merr.Status(err),
			// Status: common.StatusFromError(err),
		}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Info("failed to show partitions", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
		return &milvuspb.ShowPartitionsResponse{
			Status: merr.Status(err),
			// Status: common.StatusFromError(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("ShowPartitions").Observe(float64(t.queueDur.Milliseconds()))

	return t.Rsp, nil
}

// ShowPartitions list all partition names
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
	return c.showPartitionsImpl(ctx, in, false)
}

// ShowPartitionsInternal same to ShowPartitions, only used in internal RPC.
func (c *Core) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
	return c.showPartitionsImpl(ctx, in, true)
}

// ShowSegments list all segments
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
	// ShowSegments Only used in GetPersistentSegmentInfo, it's already deprecated for a long time.
	// Though we continue to keep current logic, it's not right enough since RootCoord only contains indexed segments.
	return &milvuspb.ShowSegmentsResponse{Status: merr.Success()}, nil
}

// GetPChannelInfo get pchannel info.
func (c *Core) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &rootcoordpb.GetPChannelInfoResponse{
			Status: merr.Status(err),
		}, nil
	}
	return c.meta.GetPChannelInfo(ctx, in.GetPchannel()), nil
}

// AllocTimestamp alloc timestamp
func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &rootcoordpb.AllocTimestampResponse{
			Status: merr.Status(err),
		}, nil
	}

	if in.BlockTimestamp > 0 {
		blockTime, _ := tsoutil.ParseTS(in.BlockTimestamp)
		lastTime := c.tsoAllocator.GetLastSavedTime()
		deltaDuration := blockTime.Sub(lastTime)
		if deltaDuration > 0 {
			log.Info("wait for block timestamp",
				zap.Time("blockTime", blockTime),
				zap.Time("lastTime", lastTime),
				zap.Duration("delta", deltaDuration))
			time.Sleep(deltaDuration + time.Millisecond*200)
		}
	}

	ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
	physicalTs, _ := tsoutil.ParseTS(ts)
	if err != nil {
		log.Ctx(ctx).Error("failed to allocate timestamp", zap.String("role", typeutil.RootCoordRole),
			zap.Error(err))

		return &rootcoordpb.AllocTimestampResponse{
			Status: merr.Status(err),
		}, nil
	}

	// return first available timestamp
	ts = ts - uint64(in.GetCount()) + 1
	metrics.RootCoordTimestamp.Set(float64(physicalTs.Unix()))
	return &rootcoordpb.AllocTimestampResponse{
		Status:    merr.Success(),
		Timestamp: ts,
		Count:     in.GetCount(),
	}, nil
}

// AllocID alloc ids
func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &rootcoordpb.AllocIDResponse{
			Status: merr.Status(err),
		}, nil
	}
	start, _, err := c.idAllocator.Alloc(in.Count)
	if err != nil {
		log.Ctx(ctx).Error("failed to allocate id",
			zap.String("role", typeutil.RootCoordRole),
			zap.Error(err))

		return &rootcoordpb.AllocIDResponse{
			Status: merr.Status(err),
			Count:  in.Count,
		}, nil
	}

	metrics.RootCoordIDAllocCounter.Add(float64(in.Count))
	return &rootcoordpb.AllocIDResponse{
		Status: merr.Success(),
		ID:     start,
		Count:  in.Count,
	}, nil
}

// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
	log := log.Ctx(ctx)
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Error(err))
		return merr.Status(err), nil
	}
	if in.Base.MsgType != commonpb.MsgType_TimeTick {
		log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType))
		return merr.Status(merr.WrapErrParameterInvalid(commonpb.MsgType_TimeTick.String(), in.Base.MsgType.String(), "invalid message type")), nil
	}
	err := c.chanTimeTick.updateTimeTick(in, "gRPC")
	if err != nil {
		log.Warn("failed to updateTimeTick",
			zap.String("role", typeutil.RootCoordRole),
			zap.Error(err))
		return merr.Status(err), nil
	}
	return merr.Success(), nil
}

// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}
	err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in)
	if err != nil {
		return merr.Status(err), nil
	}
	return merr.Success(), nil
}

// ShowConfigurations returns the configurations of RootCoord matching req.Pattern
func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &internalpb.ShowConfigurationsResponse{
			Status:        merr.Status(err),
			Configuations: nil,
		}, nil
	}

	configList := make([]*commonpb.KeyValuePair, 0)
	for key, value := range Params.GetComponentConfigurations("rootcoord", req.Pattern) {
		configList = append(configList,
			&commonpb.KeyValuePair{
				Key:   key,
				Value: value,
			})
	}

	return &internalpb.ShowConfigurationsResponse{
		Status:        merr.Success(),
		Configuations: configList,
	}, nil
}

// GetMetrics get metrics
func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.GetMetricsResponse{
			Status:   merr.Status(err),
			Response: "",
		}, nil
	}

	resp := &milvuspb.GetMetricsResponse{
		Status:        merr.Success(),
		ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, paramtable.GetNodeID()),
	}

	ret, err := c.metricsRequest.ExecuteMetricsRequest(ctx, in)
	if err != nil {
		resp.Status = merr.Status(err)
		return resp, nil
	}

	resp.Response = ret
	return resp, nil
}

// CreateAlias create collection alias
func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("CreateAlias")
	logger := log.Ctx(ctx).With(
		zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("alias", in.GetAlias()),
		zap.String("collectionName", in.GetCollectionName()))
	logger.Info("received request to create alias")

	if err := c.broadcastCreateAlias(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterAlias) {
			logger.Info("create alias already set on this collection, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Info("failed to create alias", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to create alias")
	return merr.Success(), nil
}

// DropAlias drop collection alias
func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DropAlias")
	logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("alias", in.GetAlias()))
	logger.Info("received request to drop alias")

	if err := c.broadcastDropAlias(ctx, in); err != nil {
		if errors.Is(err, merr.ErrAliasNotFound) {
			logger.Info("drop alias not found, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Info("failed to drop alias", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to drop alias")
	return merr.Success(), nil
}

// AlterAlias alter collection alias
func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("AlterAlias")
	logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("dbName", in.GetDbName()),
		zap.String("alias", in.GetAlias()),
		zap.String("collectionName", in.GetCollectionName()))
	logger.Info("received request to alter alias")

	if err := c.broadcastAlterAlias(ctx, in); err != nil {
		if errors.Is(err, errIgnoredAlterAlias) {
			logger.Info("alter alias already set on this collection, ignore it")
			metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
			return merr.Success(), nil
		}
		logger.Info("failed to alter alias", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
	logger.Info("done to alter alias")
	return merr.Success(), nil
}

// DescribeAlias describe collection alias
func (c *Core) DescribeAlias(ctx context.Context, in *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.DescribeAliasResponse{
			Status: merr.Status(err),
		}, nil
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.RootCoordRole),
		zap.String("db", in.GetDbName()),
		zap.String("alias", in.GetAlias()))
	method := "DescribeAlias"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DescribeAlias")

	log.Info("received request to describe alias")

	if in.GetAlias() == "" {
		return &milvuspb.DescribeAliasResponse{
			Status: merr.Status(merr.WrapErrParameterMissing("alias", "no input alias")),
		}, nil
	}

	collectionName, err := c.meta.DescribeAlias(ctx, in.GetDbName(), in.GetAlias(), 0)
	if err != nil {
		log.Warn("fail to DescribeAlias", zap.Error(err))
		return &milvuspb.DescribeAliasResponse{
			Status: merr.Status(err),
		}, nil
	}
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Info("done to describe alias")
	return &milvuspb.DescribeAliasResponse{
		Status:     merr.Status(nil),
		DbName:     in.GetDbName(),
		Alias:      in.GetAlias(),
		Collection: collectionName,
	}, nil
}

// ListAliases list aliases
func (c *Core) ListAliases(ctx context.Context, in *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.ListAliasesResponse{
			Status: merr.Status(err),
		}, nil
	}

	method := "ListAliases"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.RootCoordRole),
		zap.String("db", in.GetDbName()),
		zap.String("collectionName", in.GetCollectionName()))
	log.Info("received request to list aliases")

	aliases, err := c.meta.ListAliases(ctx, in.GetDbName(), in.GetCollectionName(), 0)
	if err != nil {
		log.Warn("fail to ListAliases", zap.Error(err))
		return &milvuspb.ListAliasesResponse{
			Status: merr.Status(err),
		}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Info("done to list aliases")
	return &milvuspb.ListAliasesResponse{
		Status:         merr.Status(nil),
		DbName:         in.GetDbName(),
		CollectionName: in.GetCollectionName(),
		Aliases:        aliases,
	}, nil
}

// ExpireCredCache will call invalidate credential cache
func (c *Core) ExpireCredCache(ctx context.Context, username string) error {
	req := proxypb.InvalidateCredCacheRequest{
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithSourceID(c.session.GetServerID()),
		),
		Username: username,
	}
	return c.proxyClientManager.InvalidateCredentialCache(ctx, &req)
}

// UpdateCredCache will call update credential cache
func (c *Core) UpdateCredCache(ctx context.Context, credInfo *internalpb.CredentialInfo) error {
	req := proxypb.UpdateCredCacheRequest{
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithSourceID(c.session.GetServerID()),
		),
		Username: credInfo.Username,
		Password: credInfo.Sha256Password,
	}
	return c.proxyClientManager.UpdateCredentialCache(ctx, &req)
}

// CreateCredential create new user and password
//  1. decode ciphertext password to raw password
//  2. encrypt raw password
//  3. save in to etcd
func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
	method := "CreateCredential"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
	ctxLog.Debug(method)
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastAlterUserForCreateCredential(ctx, credInfo); err != nil {
		ctxLog.Warn("CreateCredential failed", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		if errors.Is(err, errUserAlreadyExists) {
			return merr.StatusWithErrorCode(errors.Errorf("user already exists: %s", credInfo.Username), commonpb.ErrorCode_CreateCredentialFailure), nil
		}
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateCredentialFailure), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfCredentials.Inc()
	return merr.Success(), nil
}

// GetCredential get credential by username
func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
	method := "GetCredential"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
	ctxLog.Debug(method)
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &rootcoordpb.GetCredentialResponse{Status: merr.Status(err)}, nil
	}

	credInfo, err := c.meta.GetCredential(ctx, in.Username)
	if err != nil {
		ctxLog.Warn("GetCredential query credential failed", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return &rootcoordpb.GetCredentialResponse{
			Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_GetCredentialFailure),
		}, nil
	}
	ctxLog.Debug("GetCredential success")

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &rootcoordpb.GetCredentialResponse{
		Status:   merr.Success(),
		Username: credInfo.Username,
		Password: credInfo.EncryptedPassword,
	}, nil
}

// UpdateCredential update password for a user
func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
	method := "UpdateCredential"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username))
	ctxLog.Debug(method)
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastAlterUserForUpdateCredential(ctx, credInfo); err != nil {
		ctxLog.Warn("UpdateCredential append message failed", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_UpdateCredentialFailure), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return merr.Success(), nil
}

// DeleteCredential delete a user
func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
	method := "DeleteCredential"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username))
	ctxLog.Debug(method)
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	// user not found will not report to the client to achieve idempotent
	if err := c.broadcastDropUserForDeleteCredential(ctx, in); err != nil {
		if errors.Is(err, errUserNotFound) {
			ctxLog.Info("DeleteCredential user not found, ignored")
			metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
			metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
			return merr.Success(), nil
		}
		ctxLog.Warn("DeleteCredential append message failed", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_DeleteCredentialFailure), nil
	}

	ctxLog.Debug("DeleteCredential success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfCredentials.Dec()
	return merr.Success(), nil
}

// ListCredUsers list all usernames
func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
	method := "ListCredUsers"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
	ctxLog.Debug(method)
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.ListCredUsersResponse{Status: merr.Status(err)}, nil
	}

	credInfo, err := c.meta.ListCredentialUsernames(ctx)
	if err != nil {
		ctxLog.Warn("ListCredUsers query usernames failed", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()

		status := merr.Status(err)
		return &milvuspb.ListCredUsersResponse{Status: status}, nil
	}
	ctxLog.Debug("ListCredUsers success")

	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &milvuspb.ListCredUsersResponse{
		Status:    merr.Success(),
		Usernames: credInfo.Usernames,
	}, nil
}

// CreateRole create role
// - check the node health
// - check if the role is existed
// - check if the role num has reached the limit
// - create the role by the meta api
func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
	method := "CreateRole"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method + " begin")

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastCreateRole(ctx, in); err != nil {
		ctxLog.Warn("fail to create role", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
		if errors.Is(err, errRoleAlreadyExists) {
			return merr.StatusWithErrorCode(errors.Newf("role [%s] already exists", in.GetEntity()), commonpb.ErrorCode_CreateRoleFailure), nil
		}
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreateRoleFailure), nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfRoles.Inc()
	return merr.Success(), nil
}

// DropRole drop role
// - check the node health
// - check if the role name is existed
// - check if the role has some grant info
// - get all role mapping of this role
// - drop these role mappings
// - drop the role by the meta api
func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
	method := "DropRole"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.String("role_name", in.RoleName))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastDropRole(ctx, in); err != nil {
		ctxLog.Warn("fail to drop role", zap.Error(err))
		if errors.Is(err, errRoleNotExists) {
			return merr.StatusWithErrorCode(errors.New("not found the role, maybe the role isn't existed or internal system error"), commonpb.ErrorCode_DropRoleFailure), nil
		}
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_DropRoleFailure), nil
	}

	ctxLog.Debug(method+" success", zap.String("role_name", in.RoleName))
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfRoles.Dec()
	return merr.Success(), nil
}

// OperateUserRole operate the relationship between a user and a role
// - check the node health
// - check if the role is valid
// - check if the user is valid
// - operate the user-role by the meta api
// - update the policy cache
func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
	method := "OperateUserRole-" + in.Type.String()
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastOperateUserRole(ctx, in); err != nil {
		ctxLog.Warn("fail to operate the user and role", zap.Error(err))
		if errors.Is(err, errRoleNotExists) {
			return merr.StatusWithErrorCode(errors.New("not found the role, maybe the role isn't existed or internal system error"), commonpb.ErrorCode_OperateUserRoleFailure), nil
		}
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperateUserRoleFailure), nil
	}

	ctxLog.Info(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return merr.Success(), nil
}

// SelectRole select role
// - check the node health
// - check if the role is valid when this param is provided
// - select role by the meta api
func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
	method := "SelectRole"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.SelectRoleResponse{Status: merr.Status(err)}, nil
	}

	if in.Role != nil {
		if _, err := c.meta.SelectRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil {
			if errors.Is(err, merr.ErrIoKeyNotFound) {
				return &milvuspb.SelectRoleResponse{
					Status: merr.Success(),
				}, nil
			}
			errMsg := "fail to select the role to check the role name"
			ctxLog.Warn(errMsg, zap.Error(err))
			return &milvuspb.SelectRoleResponse{
				Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
			}, nil
		}
	}
	roleResults, err := c.meta.SelectRole(ctx, util.DefaultTenant, in.Role, in.IncludeUserInfo)
	if err != nil {
		errMsg := "fail to select the role"
		ctxLog.Warn(errMsg, zap.Error(err))
		return &milvuspb.SelectRoleResponse{
			Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectRoleFailure),
		}, nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &milvuspb.SelectRoleResponse{
		Status:  merr.Success(),
		Results: roleResults,
	}, nil
}

// SelectUser select user
// - check the node health
// - check if the user is valid when this param is provided
// - select user by the meta api
func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
	method := "SelectUser"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.SelectUserResponse{Status: merr.Status(err)}, nil
	}

	if in.User != nil {
		if _, err := c.meta.SelectUser(ctx, util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil {
			if errors.Is(err, merr.ErrIoKeyNotFound) {
				return &milvuspb.SelectUserResponse{
					Status: merr.Success(),
				}, nil
			}
			errMsg := "fail to select the user to check the username"
			ctxLog.Warn(errMsg, zap.Any("in", in), zap.Error(err))
			return &milvuspb.SelectUserResponse{
				Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
			}, nil
		}
	}
	userResults, err := c.meta.SelectUser(ctx, util.DefaultTenant, in.User, in.IncludeRoleInfo)
	if err != nil {
		errMsg := "fail to select the user"
		ctxLog.Warn(errMsg, zap.Error(err))
		return &milvuspb.SelectUserResponse{
			Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectUserFailure),
		}, nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &milvuspb.SelectUserResponse{
		Status:  merr.Success(),
		Results: userResults,
	}, nil
}

func (c *Core) isValidRole(ctx context.Context, entity *milvuspb.RoleEntity) error {
	if entity == nil {
		return errors.New("the role entity is nil")
	}
	if entity.Name == "" {
		return errors.New("the name in the role entity is empty")
	}
	if _, err := c.meta.SelectRole(ctx, util.DefaultTenant, &milvuspb.RoleEntity{Name: entity.Name}, false); err != nil {
		log.Warn("fail to select the role", zap.String("role_name", entity.Name), zap.Error(err))
		return errors.New("not found the role, maybe the role isn't existed or internal system error")
	}
	return nil
}

func (c *Core) isValidObject(entity *milvuspb.ObjectEntity) error {
	if entity == nil {
		return errors.New("the object entity is nil")
	}
	if _, ok := commonpb.ObjectType_value[entity.Name]; !ok {
		return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", entity.Name, lo.Keys(commonpb.ObjectType_value))
	}
	return nil
}

func (c *Core) isValidPrivilege(ctx context.Context, privilegeName string, object string) error {
	if util.IsAnyWord(privilegeName) {
		return nil
	}
	customPrivGroup, err := c.meta.IsCustomPrivilegeGroup(ctx, privilegeName)
	if err != nil {
		return err
	}
	if customPrivGroup {
		return fmt.Errorf("can not operate the custom privilege group [%s]", privilegeName)
	}
	if lo.Contains(Params.RbacConfig.GetDefaultPrivilegeGroupNames(), privilegeName) {
		return fmt.Errorf("can not operate the built-in privilege group [%s]", privilegeName)
	}
	// check object privileges for built-in privileges
	if util.IsPrivilegeNameDefined(privilegeName) {
		privileges, ok := util.ObjectPrivileges[object]
		if !ok {
			return fmt.Errorf("not found the object type[name: %s], supported the object types: %v", object, lo.Keys(commonpb.ObjectType_value))
		}
		for _, privilege := range privileges {
			if privilege == privilegeName {
				return nil
			}
		}
	}
	return fmt.Errorf("not found the privilege name[%s] in object[%s]", privilegeName, object)
}

func (c *Core) isValidPrivilegeV2(ctx context.Context, privilegeName string) error {
	if util.IsAnyWord(privilegeName) {
		return nil
	}
	customPrivGroup, err := c.meta.IsCustomPrivilegeGroup(ctx, privilegeName)
	if err != nil {
		return err
	}
	if customPrivGroup {
		return nil
	}
	if util.IsPrivilegeNameDefined(privilegeName) {
		return nil
	}
	return fmt.Errorf("not found the privilege name[%s]", privilegeName)
}

// OperatePrivilege operate the privilege, including grant and revoke
// - check the node health
// - check if the operating type is valid
// - check if the entity is nil
// - check if the params, including the resource entity, the principal entity, the grantor entity, is valid
// - operate the privilege by the meta api
// - update the policy cache
func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
	method := "OperatePrivilege"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastOperatePrivilege(ctx, in); err != nil {
		errMsg := "fail to execute task when operating the privilege"
		ctxLog.Warn(errMsg, zap.Error(err))
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return merr.Success(), nil
}

func (c *Core) operatePrivilegeCommonCheck(ctx context.Context, in *milvuspb.OperatePrivilegeRequest) error {
	if in.Type != milvuspb.OperatePrivilegeType_Grant && in.Type != milvuspb.OperatePrivilegeType_Revoke {
		errMsg := fmt.Sprintf("invalid operate privilege type, current type: %s, valid value: [%s, %s]", in.Type, milvuspb.OperatePrivilegeType_Grant, milvuspb.OperatePrivilegeType_Revoke)
		return errors.New(errMsg)
	}
	if in.Entity == nil {
		errMsg := "the grant entity in the request is nil"
		return errors.New(errMsg)
	}
	if err := c.isValidObject(in.Entity.Object); err != nil {
		return errors.New("the object entity in the request is nil or invalid")
	}
	if err := c.isValidRole(ctx, in.Entity.Role); err != nil {
		return err
	}
	entity := in.Entity.Grantor
	if entity == nil {
		return errors.New("the grantor entity is nil")
	}
	if entity.User == nil || entity.User.Name == "" {
		return errors.New("the user entity in the grantor entity is nil or empty")
	}
	if _, err := c.meta.SelectUser(ctx, util.DefaultTenant, &milvuspb.UserEntity{Name: entity.User.Name}, false); err != nil {
		log.Ctx(ctx).Warn("fail to select the user", zap.String("username", entity.User.Name), zap.Error(err))
		return errors.New("not found the user, maybe the user isn't existed or internal system error")
	}
	if entity.Privilege == nil {
		return errors.New("the privilege entity in the grantor entity is nil")
	}
	return nil
}

func (c *Core) validatePrivilegeGroupParams(ctx context.Context, entity string, dbName string, collectionName string) error {
	allGroups, err := c.getDefaultAndCustomPrivilegeGroups(ctx)
	if err != nil {
		return err
	}
	groups := lo.SliceToMap(allGroups, func(group *milvuspb.PrivilegeGroupInfo) (string, []*milvuspb.PrivilegeEntity) {
		return group.GroupName, group.Privileges
	})
	privs, exists := groups[entity]
	if !exists || len(privs) == 0 {
		// it is a privilege, no need to check with other params
		return nil
	}
	// since all privileges are same level in a group, just check the first privilege
	level := util.GetPrivilegeLevel(privs[0].GetName())
	switch level {
	case milvuspb.PrivilegeLevel_Cluster.String():
		if !util.IsAnyWord(dbName) || !util.IsAnyWord(collectionName) {
			return merr.WrapErrParameterInvalidMsg("dbName and collectionName should be * for the cluster level privilege: %s", entity)
		}
		return nil
	case milvuspb.PrivilegeLevel_Database.String():
		if collectionName != "" && collectionName != util.AnyWord {
			return merr.WrapErrParameterInvalidMsg("collectionName should be * for the database level privilege: %s", entity)
		}
		return nil
	case milvuspb.PrivilegeLevel_Collection.String():
		if util.IsAnyWord(dbName) && !util.IsAnyWord(collectionName) && collectionName != "" {
			return merr.WrapErrParameterInvalidMsg("please specify database name for the collection level privilege: %s", entity)
		}
		return nil
	default:
		return errors.New("not found the privilege level")
	}
}

func (c *Core) getMetastorePrivilegeName(ctx context.Context, privName string) (string, error) {
	// if it is '*', return directly
	if util.IsAnyWord(privName) {
		return privName, nil
	}
	// if it is built-in privilege, return the privilege name directly
	if util.IsPrivilegeNameDefined(privName) {
		return util.PrivilegeNameForMetastore(privName), nil
	}
	// return the privilege group name if it is a custom privilege group
	customGroup, err := c.meta.IsCustomPrivilegeGroup(ctx, privName)
	if err != nil {
		return "", err
	}
	if customGroup {
		return util.PrivilegeGroupNameForMetastore(privName), nil
	}
	return "", errors.Newf("not found the privilege name [%s] from metastore", privName)
}

// SelectGrant select grant
// - check the node health
// - check if the principal entity is valid
// - check if the resource entity which is provided by the user is valid
// - select grant by the meta api
func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
	method := "SelectGrant"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: merr.Status(err),
		}, nil
	}
	if in.Entity == nil {
		errMsg := "the grant entity in the request is nil"
		ctxLog.Warn(errMsg)
		return &milvuspb.SelectGrantResponse{
			Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
		}, nil
	}
	if err := c.isValidRole(ctx, in.Entity.Role); err != nil {
		ctxLog.Warn("", zap.Error(err))
		return &milvuspb.SelectGrantResponse{
			Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
		}, nil
	}
	if in.Entity.Object != nil {
		if err := c.isValidObject(in.Entity.Object); err != nil {
			ctxLog.Warn("", zap.Error(err))
			return &milvuspb.SelectGrantResponse{
				Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_SelectGrantFailure),
			}, nil
		}
	}

	grantEntities, err := c.meta.SelectGrant(ctx, util.DefaultTenant, in.Entity)
	if errors.Is(err, merr.ErrIoKeyNotFound) {
		return &milvuspb.SelectGrantResponse{
			Status:   merr.Success(),
			Entities: grantEntities,
		}, nil
	}
	if err != nil {
		errMsg := "fail to select the grant"
		ctxLog.Warn(errMsg, zap.Error(err))
		return &milvuspb.SelectGrantResponse{
			Status: merr.StatusWithErrorCode(errors.New(errMsg), commonpb.ErrorCode_SelectGrantFailure),
		}, nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &milvuspb.SelectGrantResponse{
		Status:   merr.Success(),
		Entities: grantEntities,
	}, nil
}

func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
	method := "PolicyList"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &internalpb.ListPolicyResponse{
			Status: merr.Status(err),
		}, nil
	}

	policies, err := c.meta.ListPolicy(ctx, util.DefaultTenant)
	if err != nil {
		ctxLog.Error("fail to list policy", zap.Error(err))
		return &internalpb.ListPolicyResponse{
			Status: merr.StatusWithErrorCode(fmt.Errorf("fail to list policy: %s", err.Error()), commonpb.ErrorCode_ListPolicyFailure),
		}, nil
	}
	// expand privilege groups and turn to policies
	allGroups, err := c.getDefaultAndCustomPrivilegeGroups(ctx)
	if err != nil {
		ctxLog.Error("fail to get privilege groups", zap.Error(err))
		return &internalpb.ListPolicyResponse{
			Status: merr.StatusWithErrorCode(fmt.Errorf("fail to get privilege groups: %s", err.Error()), commonpb.ErrorCode_ListPolicyFailure),
		}, nil
	}
	groups := lo.SliceToMap(allGroups, func(group *milvuspb.PrivilegeGroupInfo) (string, []*milvuspb.PrivilegeEntity) {
		return group.GroupName, group.Privileges
	})
	expandGrants, err := c.expandPrivilegeGroups(ctx, policies, groups)
	if err != nil {
		ctxLog.Error("fail to expand privilege groups", zap.Error(err))
		return &internalpb.ListPolicyResponse{
			Status: merr.StatusWithErrorCode(fmt.Errorf("fail to expand privilege groups: %s", err.Error()), commonpb.ErrorCode_ListPolicyFailure),
		}, nil
	}
	expandPolicies := lo.Map(expandGrants, func(r *milvuspb.GrantEntity, _ int) string {
		return funcutil.PolicyForPrivilege(r.Role.Name, r.Object.Name, r.ObjectName, r.Grantor.Privilege.Name, r.DbName)
	})

	userRoles, err := c.meta.ListUserRole(ctx, util.DefaultTenant)
	if err != nil {
		ctxLog.Error("fail to list user-role", zap.Any("in", in), zap.Error(err))
		return &internalpb.ListPolicyResponse{
			Status: merr.StatusWithErrorCode(fmt.Errorf("fail to list user-role: %s", err.Error()), commonpb.ErrorCode_ListPolicyFailure),
		}, nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &internalpb.ListPolicyResponse{
		Status:          merr.Success(),
		PolicyInfos:     expandPolicies,
		UserRoles:       userRoles,
		PrivilegeGroups: allGroups,
	}, nil
}

func (c *Core) BackupRBAC(ctx context.Context, in *milvuspb.BackupRBACMetaRequest) (*milvuspb.BackupRBACMetaResponse, error) {
	method := "BackupRBAC"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.BackupRBACMetaResponse{
			Status: merr.Status(err),
		}, nil
	}

	rbacMeta, err := c.meta.BackupRBAC(ctx, util.DefaultTenant)
	if err != nil {
		return &milvuspb.BackupRBACMetaResponse{
			Status: merr.Status(err),
		}, nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))

	return &milvuspb.BackupRBACMetaResponse{
		Status:   merr.Success(),
		RBACMeta: rbacMeta,
	}, nil
}

func (c *Core) RestoreRBAC(ctx context.Context, in *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) {
	method := "RestoreRBAC"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastRestoreRBACV2(ctx, in); err != nil {
		if errors.Is(err, errEmptyRBACMeta) {
			ctxLog.Info("restoring rbac meta is empty, skip restore", zap.Error(err))
			return merr.Success(), nil
		}
		errMsg := "fail to execute task when restore rbac meta data"
		ctxLog.Warn(errMsg, zap.Error(err))
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeFailure), nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return merr.Success(), nil
}

func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	log := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
		zap.String("oldDbName", req.GetDbName()),
		zap.String("newDbName", req.GetNewDBName()),
		zap.String("oldCollectionName", req.GetOldName()),
		zap.String("newCollectionName", req.GetNewName()))
	log.Info("received request to rename collection")

	metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("RenameCollection")

	if err := c.broadcastAlterCollectionForRenameCollection(ctx, req); err != nil {
		if errors.Is(err, errIgnoredAlterCollection) {
			log.Info("rename collection ignored, collection already uses the new name")
			return merr.Success(), nil
		}
		log.Warn("failed to rename collection", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
		return merr.Status(err), nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("RenameCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
	log.Info("done to rename collection")
	return merr.Success(), nil
}

func (c *Core) DescribeDatabase(ctx context.Context, req *rootcoordpb.DescribeDatabaseRequest) (*rootcoordpb.DescribeDatabaseResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
	}

	log := log.Ctx(ctx).With(zap.String("dbName", req.GetDbName()))
	log.Info("received request to describe database ")

	metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DescribeDatabase")
	t := &describeDBTask{
		baseTask: newBaseTask(ctx, c),
		Req:      req,
	}

	if err := c.scheduler.AddTask(t); err != nil {
		log.Warn("failed to enqueue request to describe database", zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc()
		return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
	}

	if err := t.WaitToFinish(); err != nil {
		log.Warn("failed to describe database", zap.Uint64("ts", t.GetTs()), zap.Error(err))
		metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.FailLabel).Inc()
		return &rootcoordpb.DescribeDatabaseResponse{Status: merr.Status(err)}, nil
	}

	metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeDatabase", metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues("DescribeDatabase").Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Info("done to describe database", zap.Uint64("ts", t.GetTs()))
	return t.Rsp, nil
}

func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.CheckHealthResponse{
			Status:    merr.Status(err),
			IsHealthy: false,
			Reasons:   []string{fmt.Sprintf("serverID=%d: %v", c.session.GetServerID(), err)},
		}, nil
	}

	group, ctx := errgroup.WithContext(ctx)
	errs := typeutil.NewConcurrentSet[error]()

	proxyClients := c.proxyClientManager.GetProxyClients()
	proxyClients.Range(func(key int64, value types.ProxyClient) bool {
		nodeID := key
		proxyClient := value
		group.Go(func() error {
			sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
			if err != nil {
				errs.Insert(err)
				return err
			}

			err = merr.AnalyzeState("Proxy", nodeID, sta)
			if err != nil {
				errs.Insert(err)
			}

			return err
		})
		return true
	})

	maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
	if maxDelay > 0 {
		group.Go(func() error {
			err := CheckTimeTickLagExceeded(ctx, c.mixCoord, maxDelay)
			if err != nil {
				errs.Insert(err)
			}
			return err
		})
	}

	err := group.Wait()
	if err != nil {
		return &milvuspb.CheckHealthResponse{
			Status:    merr.Success(),
			IsHealthy: false,
			Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
				return err.Error()
			}),
		}, nil
	}

	return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
}

func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
	method := "CreatePrivilegeGroup"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreatePrivilegeGroupFailure), nil
	}

	if err := c.broadcastCreatePrivilegeGroup(ctx, in); err != nil {
		ctxLog.Warn("fail to create privilege group", zap.Error(err))
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_CreatePrivilegeGroupFailure), nil
	}

	ctxLog.Info(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfPrivilegeGroups.Inc()
	return merr.Success(), nil
}

func (c *Core) DropPrivilegeGroup(ctx context.Context, in *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) {
	method := "DropPrivilegeGroup"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_DropPrivilegeGroupFailure), nil
	}

	if err := c.broadcastDropPrivilegeGroup(ctx, in); err != nil {
		if errors.Is(err, errNotCustomPrivilegeGroup) {
			ctxLog.Info("privilege group is not custom privilege group, skip drop", zap.Error(err))
			return merr.Success(), nil
		}
		ctxLog.Warn("fail to drop privilege group", zap.Error(err))
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_DropPrivilegeGroupFailure), nil
	}

	ctxLog.Info(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfPrivilegeGroups.Desc()
	return merr.Success(), nil
}

func (c *Core) ListPrivilegeGroups(ctx context.Context, in *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) {
	method := "ListPrivilegeGroups"
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return &milvuspb.ListPrivilegeGroupsResponse{
			Status: merr.Status(err),
		}, nil
	}

	privGroups, err := c.meta.ListPrivilegeGroups(ctx)
	if err != nil {
		ctxLog.Warn("fail to list privilege group", zap.Error(err))
		return &milvuspb.ListPrivilegeGroupsResponse{
			Status: merr.StatusWithErrorCode(err, commonpb.ErrorCode_ListPrivilegeGroupsFailure),
		}, nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))

	// append built in privilege groups
	privGroups = append(privGroups, Params.RbacConfig.GetDefaultPrivilegeGroups()...)
	return &milvuspb.ListPrivilegeGroupsResponse{
		Status:          merr.Success(),
		PrivilegeGroups: privGroups,
	}, nil
}

func (c *Core) OperatePrivilegeGroup(ctx context.Context, in *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error) {
	method := "OperatePrivilegeGroup-" + in.Type.String()
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder(method)
	ctxLog := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), zap.Any("in", in))
	ctxLog.Debug(method)

	if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
		return merr.Status(err), nil
	}

	if err := c.broadcastOperatePrivilegeGroup(ctx, in); err != nil {
		errMsg := "fail to execute task when operate privilege group"
		ctxLog.Warn(errMsg, zap.Error(err))
		return merr.StatusWithErrorCode(err, commonpb.ErrorCode_OperatePrivilegeGroupFailure), nil
	}

	ctxLog.Debug(method + " success")
	metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return merr.Success(), nil
}

func (c *Core) expandPrivilegeGroups(ctx context.Context, grants []*milvuspb.GrantEntity, groups map[string][]*milvuspb.PrivilegeEntity) ([]*milvuspb.GrantEntity, error) {
	newGrants := []*milvuspb.GrantEntity{}
	createGrantEntity := func(grant *milvuspb.GrantEntity, privilegeName string) (*milvuspb.GrantEntity, error) {
		metaName, err := c.getMetastorePrivilegeName(ctx, privilegeName)
		if err != nil {
			return nil, err
		}
		objectType := &milvuspb.ObjectEntity{
			Name: util.GetObjectType(privilegeName),
		}
		objectName := grant.ObjectName
		if objectType.Name == commonpb.ObjectType_Global.String() {
			objectName = util.AnyWord
		}
		return &milvuspb.GrantEntity{
			Role:       grant.Role,
			Object:     objectType,
			ObjectName: objectName,
			Grantor: &milvuspb.GrantorEntity{
				User: grant.Grantor.User,
				Privilege: &milvuspb.PrivilegeEntity{
					Name: metaName,
				},
			},
			DbName: grant.DbName,
		}, nil
	}

	for _, grant := range grants {
		privName := grant.Grantor.Privilege.Name
		privGroup, exists := groups[privName]
		if !exists {
			privGroup = []*milvuspb.PrivilegeEntity{{Name: privName}}
		}
		for _, priv := range privGroup {
			newGrant, err := createGrantEntity(grant, priv.Name)
			if err != nil {
				return nil, err
			}
			newGrants = append(newGrants, newGrant)
		}
	}
	// uniq by role + object + object name + grantor user + privilege name + db name
	return lo.UniqBy(newGrants, func(g *milvuspb.GrantEntity) string {
		return fmt.Sprintf("%s-%s-%s-%s-%s-%s", g.Role, g.Object, g.ObjectName, g.Grantor.User, g.Grantor.Privilege.Name, g.DbName)
	}), nil
}

// getDefaultAndCustomPrivilegeGroups returns default privilege groups and user-defined privilege groups.
func (c *Core) getDefaultAndCustomPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupInfo, error) {
	allGroups, err := c.meta.ListPrivilegeGroups(ctx)
	allGroups = append(allGroups, Params.RbacConfig.GetDefaultPrivilegeGroups()...)
	if err != nil {
		return nil, err
	}
	return allGroups, nil
}

func (c *Core) getCurrentUserVisibleDatabases(ctx context.Context) (typeutil.Set[string], error) {
	enableAuth := Params.CommonCfg.AuthorizationEnabled.GetAsBool()
	privilegeDatabases := typeutil.NewSet[string]()
	if !enableAuth {
		privilegeDatabases.Insert(util.AnyWord)
		return privilegeDatabases, nil
	}
	curUser, err := contextutil.GetCurUserFromContext(ctx)
	// it will fail if the inner node server use the list database API
	if err != nil || (curUser == util.UserRoot && !Params.CommonCfg.RootShouldBindRole.GetAsBool()) {
		if err != nil {
			log.Ctx(ctx).Warn("get current user from context failed", zap.Error(err))
		}
		privilegeDatabases.Insert(util.AnyWord)
		return privilegeDatabases, nil
	}
	userRoles, err := c.meta.SelectUser(ctx, "", &milvuspb.UserEntity{
		Name: curUser,
	}, true)
	if err != nil {
		return nil, err
	}
	if len(userRoles) == 0 {
		return privilegeDatabases, nil
	}
	for _, role := range userRoles[0].Roles {
		if role.GetName() == util.RoleAdmin {
			privilegeDatabases.Insert(util.AnyWord)
			return privilegeDatabases, nil
		}
		if role.GetName() == util.RolePublic {
			continue
		}
		entities, err := c.meta.SelectGrant(ctx, "", &milvuspb.GrantEntity{
			Role:   role,
			DbName: util.AnyWord,
		})
		if err != nil {
			return nil, err
		}
		for _, entity := range entities {
			privilegeDatabases.Insert(entity.GetDbName())
			if entity.GetDbName() == util.AnyWord {
				return privilegeDatabases, nil
			}
		}
	}
	return privilegeDatabases, nil
}

func isVisibleDatabaseForCurUser(currentDatabase string, visibleDatabases typeutil.Set[string]) bool {
	if visibleDatabases.Contain(util.AnyWord) {
		return true
	}
	return visibleDatabases.Contain(currentDatabase)
}

func (c *Core) getCurrentUserVisibleCollections(ctx context.Context, databaseName string) (typeutil.Set[string], error) {
	enableAuth := Params.CommonCfg.AuthorizationEnabled.GetAsBool()
	privilegeColls := typeutil.NewSet[string]()
	if !enableAuth {
		privilegeColls.Insert(util.AnyWord)
		return privilegeColls, nil
	}
	curUser, err := contextutil.GetCurUserFromContext(ctx)
	if err != nil || (curUser == util.UserRoot && !Params.CommonCfg.RootShouldBindRole.GetAsBool()) {
		if err != nil {
			log.Ctx(ctx).Warn("get current user from context failed", zap.Error(err))
		}
		privilegeColls.Insert(util.AnyWord)
		return privilegeColls, nil
	}
	userRoles, err := c.meta.SelectUser(ctx, "", &milvuspb.UserEntity{
		Name: curUser,
	}, true)
	if err != nil {
		return nil, err
	}
	if len(userRoles) == 0 {
		return privilegeColls, nil
	}

	// Get all custom privilege groups
	customPrivilegeGroups, err := c.meta.ListPrivilegeGroups(ctx)
	if err != nil {
		return nil, err
	}
	customPrivilegeGroupMap := lo.SliceToMap(customPrivilegeGroups, func(group *milvuspb.PrivilegeGroupInfo) (string, struct{}) {
		return group.GroupName, struct{}{}
	})

	for _, role := range userRoles[0].Roles {
		if role.GetName() == util.RoleAdmin {
			privilegeColls.Insert(util.AnyWord)
			return privilegeColls, nil
		}
		if role.GetName() == util.RolePublic {
			continue
		}
		entities, err := c.meta.SelectGrant(ctx, "", &milvuspb.GrantEntity{
			Role:   role,
			DbName: databaseName,
		})
		if err != nil {
			return nil, err
		}
		for _, entity := range entities {
			objectType := entity.GetObject().GetName()
			priv := entity.GetGrantor().GetPrivilege().GetName()
			if objectType == commonpb.ObjectType_Global.String() &&
				priv == util.PrivilegeNameForAPI(commonpb.ObjectPrivilege_PrivilegeAll.String()) {
				privilegeColls.Insert(util.AnyWord)
				return privilegeColls, nil
			}
			// should list collection level built-in privilege group or custom privilege group objects
			if objectType != commonpb.ObjectType_Collection.String() {
				_, isCustomGroup := customPrivilegeGroupMap[priv]
				if !isCustomGroup && !Params.RbacConfig.IsCollectionPrivilegeGroup(priv) {
					continue
				}
			}
			collectionName := entity.GetObjectName()
			privilegeColls.Insert(collectionName)
			if collectionName == util.AnyWord {
				return privilegeColls, nil
			}
		}
	}
	return privilegeColls, nil
}

func isVisibleCollectionForCurUser(collectionName string, visibleCollections typeutil.Set[string]) bool {
	if visibleCollections.Contain(util.AnyWord) {
		return true
	}
	return visibleCollections.Contain(collectionName)
}

func (c *Core) GetQuotaMetrics(ctx context.Context, req *internalpb.GetQuotaMetricsRequest) (*internalpb.GetQuotaMetricsResponse, error) {
	return c.quotaCenter.getQuotaMetrics(), nil
}
